{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Feature Transformation with Amazon a SageMaker Processing Job and Scikit-Learn\n",
    "\n",
    "Typically a machine learning (ML) process consists of few steps. First, gathering data with various ETL jobs, then pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm.\n",
    "\n",
    "Often, distributed data processing frameworks such as Scikit-Learn are used to pre-process data sets in order to prepare them for training. In this notebook we'll use Amazon SageMaker Processing, and leverage the power of Scikit-Learn in a managed SageMaker environment to run our processing workload."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![](img/prepare_dataset.png)\n",
    "\n",
    "![](img/processing.jpg)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Contents\n",
    "\n",
    "1. Setup Environment\n",
    "1. Setup Input Data\n",
    "1. Setup Output Data\n",
    "1. Build a Spark container for running the processing job\n",
    "1. Run the Processing Job using Amazon SageMaker\n",
    "1. Inspect the Processed Output Data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup Environment\n",
    "\n",
    "Let's start by specifying:\n",
    "* The S3 bucket and prefixes that you use for training and model data. Use the default bucket specified by the Amazon SageMaker session.\n",
    "* The IAM role ARN used to give processing and training access to the dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Requirement already satisfied: boto3 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (1.12.20)\n",
      "Requirement already satisfied: s3transfer<0.4.0,>=0.3.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from boto3) (0.3.3)\n",
      "Requirement already satisfied: jmespath<1.0.0,>=0.7.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from boto3) (0.9.4)\n",
      "Requirement already satisfied: botocore<1.16.0,>=1.15.20 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from boto3) (1.15.20)\n",
      "Requirement already satisfied: python-dateutil<3.0.0,>=2.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from botocore<1.16.0,>=1.15.20->boto3) (2.7.3)\n",
      "Requirement already satisfied: docutils<0.16,>=0.10 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from botocore<1.16.0,>=1.15.20->boto3) (0.14)\n",
      "Requirement already satisfied: urllib3<1.26,>=1.20; python_version != \"3.4\" in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from botocore<1.16.0,>=1.15.20->boto3) (1.23)\n",
      "Requirement already satisfied: six>=1.5 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from python-dateutil<3.0.0,>=2.1->botocore<1.16.0,>=1.15.20->boto3) (1.11.0)\n",
      "\u001b[33mYou are using pip version 10.0.1, however version 20.0.2 is available.\n",
      "You should consider upgrading via the 'pip install --upgrade pip' command.\u001b[0m\n"
     ]
    }
   ],
   "source": [
    "!pip install boto3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sagemaker\n",
    "from time import gmtime, strftime\n",
    "import boto3\n",
    "\n",
    "sagemaker_session = sagemaker.Session()\n",
    "role = sagemaker.get_execution_role()\n",
    "bucket = sagemaker_session.default_bucket()\n",
    "region = boto3.Session().region_name"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup Input Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "s3://sagemaker-us-east-1-835319576252/amazon-reviews-pds/tsv/\n"
     ]
    }
   ],
   "source": [
    "# Inputs\n",
    "s3_input_data = 's3://{}/amazon-reviews-pds/tsv/'.format(bucket)\n",
    "print(s3_input_data)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2020-03-21 17:59:04   18997559 amazon_reviews_us_Digital_Software_v1_00.tsv.gz\r\n",
      "2020-03-21 17:59:05   27442648 amazon_reviews_us_Digital_Video_Games_v1_00.tsv.gz\r\n"
     ]
    }
   ],
   "source": [
    "!aws s3 ls $s3_input_data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup Output Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Processing job name:  amazon-reviews-scikit-processor-2020-03-22-21-04-18\n"
     ]
    }
   ],
   "source": [
    "timestamp_prefix = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n",
    "\n",
    "output_prefix = 'amazon-reviews-scikit-processor-{}'.format(timestamp_prefix)\n",
    "scikit_processing_job_name = 'amazon-reviews-scikit-processor-{}'.format(timestamp_prefix)\n",
    "\n",
    "print('Processing job name:  {}'.format(scikit_processing_job_name))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Run the Processing Job using Amazon SageMaker\n",
    "\n",
    "Next, use the Amazon SageMaker Python SDK to submit a processing job. Use the Spark container that was just built, and a SparkML script for processing in the job configuration."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Review the Spark processing script."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "import argparse\r\n",
      "import json\r\n",
      "import os\r\n",
      "import pandas as pd\r\n",
      "import numpy as np\r\n",
      "import csv\r\n",
      "import glob\r\n",
      "from pathlib import Path\r\n",
      "\r\n",
      "# requirements.txt\r\n",
      "import subprocess\r\n",
      "import sys\r\n",
      "subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-r', 'requirements.txt'])\r\n",
      "import torch\r\n",
      "import transformers as ppb\r\n",
      "import sklearn\r\n",
      "from sklearn.model_selection import train_test_split\r\n",
      "\r\n",
      "def list_arg(raw_value):\r\n",
      "    \"\"\"argparse type for a list of strings\"\"\"\r\n",
      "    return str(raw_value).split(',')\r\n",
      "\r\n",
      "\r\n",
      "def parse_args():\r\n",
      "    # Unlike SageMaker training jobs (which have `SM_HOSTS` and `SM_CURRENT_HOST` env vars), processing jobs to need to parse the resource config file directly\r\n",
      "    resconfig = {}\r\n",
      "    try:\r\n",
      "        with open('/opt/ml/config/resourceconfig.json', 'r') as cfgfile:\r\n",
      "            resconfig = json.load(cfgfile)\r\n",
      "    except FileNotFoundError:\r\n",
      "        print('/opt/ml/config/resourceconfig.json not found.  current_host is unknown.')\r\n",
      "        pass # Ignore\r\n",
      "\r\n",
      "    # Local testing with CLI args\r\n",
      "    parser = argparse.ArgumentParser(description='Process')\r\n",
      "\r\n",
      "    parser.add_argument('--hosts', type=list_arg,\r\n",
      "        default=resconfig.get('hosts', ['unknown']),\r\n",
      "        help='Comma-separated list of host names running the job'\r\n",
      "    )\r\n",
      "    parser.add_argument('--current-host', type=str,\r\n",
      "        default=resconfig.get('current_host', 'unknown'),\r\n",
      "        help='Name of this host running the job'\r\n",
      "    )\r\n",
      "    parser.add_argument('--input-data', type=str,\r\n",
      "        default='/opt/ml/processing/input/data',\r\n",
      "    )\r\n",
      "    parser.add_argument('--output-data', type=str,\r\n",
      "        default='/opt/ml/processing/output',\r\n",
      "    )\r\n",
      "    return parser.parse_args()\r\n",
      "\r\n",
      "\r\n",
      "def process(args):\r\n",
      "    print('Current host: {}'.format(args.current_host))\r\n",
      "\r\n",
      "#    print('Listing contents of {}'.format(args.input_data))\r\n",
      "#    dirs_input = os.listdir(args.input_data)\r\n",
      "\r\n",
      "    train_data = None\r\n",
      "    validation_data = None\r\n",
      "    test_data = None\r\n",
      "\r\n",
      "    # This would print all the files and directories\r\n",
      "    for file in glob.glob('{}/*.tsv.gz'.format(args.input_data)):\r\n",
      "        print(file)\r\n",
      "\r\n",
      "        filename_without_extension = Path(Path(file).stem).stem\r\n",
      "        \r\n",
      "        df = pd.read_csv(file, \r\n",
      "                         delimiter='\\t', \r\n",
      "                         quoting=csv.QUOTE_NONE,\r\n",
      "                         compression='gzip')\r\n",
      "        df.shape\r\n",
      "        df.head(5)\r\n",
      "\r\n",
      "        df.isna().values.any()\r\n",
      "\r\n",
      "        df = df.dropna()\r\n",
      "        df = df.reset_index(drop=True)\r\n",
      "        df.shape\r\n",
      "\r\n",
      "        df['is_positive_sentiment'] = (df['star_rating'] >= 4).astype(int)\r\n",
      "        df.shape\r\n",
      "\r\n",
      "        ###########\r\n",
      "        # TODO:  increase batch size and run through all the data\r\n",
      "        ###########        \r\n",
      "\r\n",
      "        batch_1 = df[['review_body', 'is_positive_sentiment']][:100]\r\n",
      "        batch_1.shape\r\n",
      "        batch_1.head(5)\r\n",
      "\r\n",
      "        # ## Loading the Pre-trained BERT model\r\n",
      "        # Let's now load a pre-trained BERT model. \r\n",
      "\r\n",
      "        # For DistilBERT (lightweight for a notebook like this):\r\n",
      "        #model_class, tokenizer_class, pretrained_weights = (ppb.DistilBertModel, ppb.DistilBertTokenizer, 'distilbert-base-uncased')\r\n",
      "\r\n",
      "        # For Bert (requires a lot more memory):\r\n",
      "        model_class, tokenizer_class, pretrained_weights = (ppb.BertModel, ppb.BertTokenizer, 'bert-base-uncased')\r\n",
      "\r\n",
      "        # Load pretrained model/tokenizer\r\n",
      "        tokenizer = tokenizer_class.from_pretrained(pretrained_weights)\r\n",
      "        model = model_class.from_pretrained(pretrained_weights)\r\n",
      "\r\n",
      "        # Right now, the variable `model` holds a pretrained BERT or distilBERT model (a version of BERT that is smaller, but much faster and requiring a lot less memory.)\r\n",
      "        # \r\n",
      "        # ## Preparing the Dataset\r\n",
      "        # Before we can hand our sentences to BERT, we need to so some minimal processing to put them in the format it requires.\r\n",
      "        # \r\n",
      "        # ### Tokenization\r\n",
      "        # Our first step is to tokenize the sentences -- break them up into word and subwords in the format BERT is comfortable with.\r\n",
      "        tokenized = batch_1['review_body'].apply((lambda x: tokenizer.encode(x, add_special_tokens=True)))\r\n",
      "\r\n",
      "        # ### Padding\r\n",
      "        # After tokenization, `tokenized` is a list of sentences -- each sentences is represented as a list of tokens. We want BERT to process our examples all at once (as one batch). It's just faster that way. For that reason, we need to pad all lists to the same size, so we can represent the input as one 2-d array, rather than a list of lists (of different lengths).\r\n",
      "\r\n",
      "        max_len = 0\r\n",
      "        for i in tokenized.values:\r\n",
      "            if len(i) > max_len:\r\n",
      "                max_len = len(i)\r\n",
      "\r\n",
      "        padded = np.array([i + [0]*(max_len-len(i)) for i in tokenized.values])\r\n",
      "\r\n",
      "        # Our dataset is now in the `padded` variable, we can view its dimensions below:\r\n",
      "        np.array(padded).shape\r\n",
      "\r\n",
      "        # ### Masking\r\n",
      "        # If we directly send `padded` to BERT, that would slightly confuse it. We need to create another variable to tell it to ignore (mask) the padding we've added when it's processing its input. That's what attention_mask is:\r\n",
      "        attention_mask = np.where(padded != 0, 1, 0)\r\n",
      "        attention_mask.shape\r\n",
      "\r\n",
      "        # The `model()` function runs our sentences through BERT. The results of the processing will be returned into `last_hidden_states`.\r\n",
      "\r\n",
      "        input_ids = torch.tensor(padded)  \r\n",
      "        attention_mask = torch.tensor(attention_mask)\r\n",
      "\r\n",
      "        with torch.no_grad():\r\n",
      "            last_hidden_states = model(input_ids, attention_mask=attention_mask)\r\n",
      "\r\n",
      "        features = last_hidden_states[0][:,0,:].numpy()\r\n",
      "        print(features)\r\n",
      "        print(type(features))\r\n",
      "\r\n",
      "        labels = batch_1['is_positive_sentiment']\r\n",
      "        print(labels)\r\n",
      "        print(type(labels))\r\n",
      "        \r\n",
      "        # TODO: Merge features and labels for our purpose here\r\n",
      "        train_features, test_features, train_labels, test_labels = train_test_split(features, labels, stratify=batch_1['is_positive_sentiment'])\r\n",
      "\r\n",
      "#        # Split all data into 90% train and 10% holdout\r\n",
      "#        train_features, holdout_features, train_labels, holdout_labels = train_test_split(features, labels, stratify=batch_1['is_positive_sentiment'])\r\n",
      "#        # Split the holdout into 50% validation and 50% test\r\n",
      "#        validation_features, test_features, validation_labels, test_labels = train_test_split(holdout_features, holdout_labels, stratify=batch_1['is_positive_sentiment'])\r\n",
      "      \r\n",
      "        train_features.shape\r\n",
      "        print(train_features)\r\n",
      "        print(type(train_features))\r\n",
      "        df_train_features = pd.DataFrame(train_features)\r\n",
      "        df_train_labels = pd.DataFrame(train_labels)\r\n",
      "        df_train = pd.concat([df_train_features, df_train_labels], axis=1)\r\n",
      "        print(df_train)\r\n",
      "\r\n",
      "#        validation_features.shape\r\n",
      "#        print(validation_features)\r\n",
      "#        print(type(validation_features))\r\n",
      "#        df_validation_features = pd.DataFrame(validation_features)\r\n",
      "\r\n",
      "        test_features.shape\r\n",
      "        print(test_features)\r\n",
      "        print(type(test_features))\r\n",
      "        df_test_features = pd.DataFrame(test_features)\r\n",
      "\r\n",
      "        train_output_data = '{}/raw/labeled/split/balanced/header/train'.format(args.output_data)\r\n",
      "        print('Creating directory {}'.format(train_output_data))\r\n",
      "        os.makedirs(train_output_data, exist_ok=True)\r\n",
      "        print('Writing to {}/part-{}-{}.csv'.format(train_output_data, args.current_host, filename_without_extension))\r\n",
      "        df_train.to_csv('{}/part-{}-{}.csv'.format(train_output_data, args.current_host, filename_without_extension), sep=',', index=False, header=None)\r\n",
      "\r\n",
      "#        validation_output_data = '{}/raw/labeled/split/balanced/header/validation'.format(args.output_data)\r\n",
      "#        print('Creating directory {}'.format(validation_output_data))\r\n",
      "#        os.makedirs(validation_output_data, exist_ok=True)\r\n",
      "#        print('Writing to {}/part-{}-{}.csv'.format(validation_output_data, args.current_host, filename_without_extension))     \r\n",
      "#        df_validation_features.to_csv('{}/part-{}-{}.csv'.format(validation_output_data, args.current_host, filename_without_extension), sep=',', index=False, header=None)\r\n",
      "\r\n",
      "        test_output_data = '{}/raw/labeled/split/balanced/header/test'.format(args.output_data)\r\n",
      "        print('Creating directory {}'.format(test_output_data))\r\n",
      "        os.makedirs(test_output_data, exist_ok=True)\r\n",
      "        print('Writing to {}/part-{}-{}.csv'.format(test_output_data, args.current_host, filename_without_extension))\r\n",
      "        df_test_features.to_csv('{}/part-{}-{}.csv'.format(test_output_data, args.current_host, filename_without_extension), sep=',', index=False, header=None)\r\n",
      "        \r\n",
      "\r\n",
      "    print('Listing contents of {}'.format(args.output_data))\r\n",
      "    dirs_output = os.listdir(args.output_data)\r\n",
      "    for file in dirs_output:\r\n",
      "        print(file)\r\n",
      "\r\n",
      "    print('Listing contents of {}'.format(train_data))\r\n",
      "    dirs_output = os.listdir(train_data)\r\n",
      "    for file in dirs_output:\r\n",
      "        print(file)\r\n",
      "\r\n",
      "#    print('Listing contents of {}'.format(validation_data))\r\n",
      "#    dirs_output = os.listdir(validation_data)\r\n",
      "#    for file in dirs_output:\r\n",
      "#        print(file)\r\n",
      "\r\n",
      "    print('Listing contents of {}'.format(test_data))\r\n",
      "    dirs_output = os.listdir(test_data)\r\n",
      "    for file in dirs_output:\r\n",
      "        print(file)\r\n",
      "\r\n",
      "    print('Complete')\r\n",
      "    \r\n",
      "    \r\n",
      "if __name__ == \"__main__\":\r\n",
      "    args = parse_args()\r\n",
      "    print('Loaded arguments:')\r\n",
      "    print(args)\r\n",
      "    \r\n",
      "    print('Environment variables:')\r\n",
      "    print(os.environ)\r\n",
      "\r\n",
      "    process(args)\r\n"
     ]
    }
   ],
   "source": [
    "cat preprocess-scikit-text-to-bert.py"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Run this script as a processing job.  You specify the command (`/opt/program/submit` for this Spark processor.)  You also need to specify one `ProcessingInput` with the `source` argument of the Amazon S3 bucket and `destination` is where the script reads this data from `/opt/ml/processing/input` (inside the Docker container.)  All local paths inside the processing container must begin with `/opt/ml/processing/`.\n",
    "\n",
    "Also give the `run()` method a `ProcessingOutput`, where the `source` is the path the script writes output data to.  For outputs, the `destination` defaults to an S3 bucket that the Amazon SageMaker Python SDK creates for you, following the format `s3://sagemaker-<region>-<account_id>/<processing_job_name>/output/<output_name>/`.  You also give the `ProcessingOutput` value for `output_name`, to make it easier to retrieve these output artifacts after the job is run.\n",
    "\n",
    "The arguments parameter in the `run()` method are command-line arguments in our `preprocess-scikit-text-to-bert.py` script."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sagemaker.sklearn.processing import SKLearnProcessor\n",
    "from sagemaker.processing import ProcessingInput, ProcessingOutput\n",
    "\n",
    "processor = SKLearnProcessor(framework_version='0.20.0',\n",
    "                             role=role,\n",
    "                             instance_type='ml.m5.4xlarge',\n",
    "                             instance_count=2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "Job Name:  sagemaker-scikit-learn-2020-03-22-21-16-53-826\n",
      "Inputs:  [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-835319576252/amazon-reviews-pds/tsv/', 'LocalPath': '/opt/ml/processing/input/data/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-835319576252/sagemaker-scikit-learn-2020-03-22-21-16-53-826/input/code/preprocess-scikit-text-to-bert.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]\n",
      "Outputs:  [{'OutputName': 'raw-labeled-split-balanced-header-train', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-835319576252/sagemaker-scikit-learn-2020-03-22-21-16-53-826/output/raw-labeled-split-balanced-header-train', 'LocalPath': '/opt/ml/processing/output/raw/labeled/split/balanced/header/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'raw-labeled-split-balanced-header-validation', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-835319576252/sagemaker-scikit-learn-2020-03-22-21-16-53-826/output/raw-labeled-split-balanced-header-validation', 'LocalPath': '/opt/ml/processing/output/raw/labeled/split/balanced/header/validation', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'raw-labeled-split-balanced-header-test', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-835319576252/sagemaker-scikit-learn-2020-03-22-21-16-53-826/output/raw-labeled-split-balanced-header-test', 'LocalPath': '/opt/ml/processing/output/raw/labeled/split/balanced/header/test', 'S3UploadMode': 'EndOfJob'}}]\n"
     ]
    }
   ],
   "source": [
    "# ShardedS3Key to spread the transformations across all nodes\n",
    "processor.run(code='preprocess-scikit-text-to-bert.py',\n",
    "                      inputs=[ProcessingInput(source=s3_input_data,\n",
    "                                              destination='/opt/ml/processing/input/data/',\n",
    "                                              s3_data_distribution_type='ShardedByS3Key')],\n",
    "                      outputs=[\n",
    "                               ProcessingOutput(s3_upload_mode='EndOfJob',\n",
    "                                                output_name='raw-labeled-split-balanced-header-train',\n",
    "                                                source='/opt/ml/processing/output/raw/labeled/split/balanced/header/train'),\n",
    "                               ProcessingOutput(s3_upload_mode='EndOfJob',\n",
    "                                                output_name='raw-labeled-split-balanced-header-validation',\n",
    "                                                source='/opt/ml/processing/output/raw/labeled/split/balanced/header/validation'),\n",
    "                               ProcessingOutput(s3_upload_mode='EndOfJob',\n",
    "                                                output_name='raw-labeled-split-balanced-header-test',\n",
    "                                                source='/opt/ml/processing/output/raw/labeled/split/balanced/header/test'),\n",
    "                      ],\n",
    "                      logs=True,\n",
    "                      wait=False)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<b>Review <a href=\"https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/aws/sagemaker/ProcessingJobs;prefix=sagemaker-scikit-learn-2020-03-22-21-16-53-826;streamFilter=typeLogStreamPrefix\">CloudWatch Logs</a> After About 5 Minutes</b>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "scikit_processing_job_name = processor.jobs[-1].describe()['ProcessingJobName']\n",
    "\n",
    "from IPython.core.display import display, HTML\n",
    "display(HTML('<b>Review <a href=\"https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/ProcessingJobs;prefix={};streamFilter=typeLogStreamPrefix\">CloudWatch Logs</a> After About 5 Minutes</b>'.format(region, scikit_processing_job_name)))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<b>Review <a href=\"https://s3.console.aws.amazon.com/s3/buckets/sagemaker-us-east-1-835319576252/sagemaker-scikit-learn-2020-03-22-21-16-53-826/?region=us-east-1&tab=overview\">S3 Output Data</a> After The Spark Job Has Completed</b>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "from IPython.core.display import display, HTML\n",
    "\n",
    "# Our job writes to `processing_job_name` since we are using ProcessingOutput above\n",
    "scikit_processing_job_s3_output_prefix = scikit_processing_job_name\n",
    "\n",
    "display(HTML('<b>Review <a href=\"https://s3.console.aws.amazon.com/s3/buckets/{}/{}/?region={}&tab=overview\">S3 Output Data</a> After The Spark Job Has Completed</b>'.format(bucket, scikit_processing_job_s3_output_prefix, region)))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Please Wait Until the Processing Job Completes\n",
    "Re-run this next cell until the job status shows `Completed`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "InProgress\n",
      "\n",
      "\n",
      "{'ProcessingInputs': [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-835319576252/amazon-reviews-pds/tsv/', 'LocalPath': '/opt/ml/processing/input/data/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-835319576252/sagemaker-scikit-learn-2020-03-22-21-04-21-521/input/code/preprocess-scikit-text-to-bert.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}], 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'raw-labeled-split-balanced-header-train', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-835319576252/sagemaker-scikit-learn-2020-03-22-21-04-21-521/output/raw-labeled-split-balanced-header-train', 'LocalPath': '/opt/ml/processing/output/raw/labeled/split/balanced/header/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'raw-labeled-split-balanced-header-validation', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-835319576252/sagemaker-scikit-learn-2020-03-22-21-04-21-521/output/raw-labeled-split-balanced-header-validation', 'LocalPath': '/opt/ml/processing/output/raw/labeled/split/balanced/header/validation', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'raw-labeled-split-balanced-header-test', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-835319576252/sagemaker-scikit-learn-2020-03-22-21-04-21-521/output/raw-labeled-split-balanced-header-test', 'LocalPath': '/opt/ml/processing/output/raw/labeled/split/balanced/header/test', 'S3UploadMode': 'EndOfJob'}}]}, 'ProcessingJobName': 'sagemaker-scikit-learn-2020-03-22-21-04-21-521', 'ProcessingResources': {'ClusterConfig': {'InstanceCount': 2, 'InstanceType': 'ml.m5.4xlarge', 'VolumeSizeInGB': 30}}, 'StoppingCondition': {'MaxRuntimeInSeconds': 86400}, 'AppSpecification': {'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.20.0-cpu-py3', 'ContainerEntrypoint': ['python3', '/opt/ml/processing/input/code/preprocess-scikit-text-to-bert.py']}, 'RoleArn': 'arn:aws:iam::835319576252:role/service-role/AmazonSageMaker-ExecutionRole-20191006T135881', 'ProcessingJobArn': 'arn:aws:sagemaker:us-east-1:835319576252:processing-job/sagemaker-scikit-learn-2020-03-22-21-04-21-521', 'ProcessingJobStatus': 'InProgress', 'LastModifiedTime': datetime.datetime(2020, 3, 22, 21, 4, 22, 11000, tzinfo=tzlocal()), 'CreationTime': datetime.datetime(2020, 3, 22, 21, 4, 21, 811000, tzinfo=tzlocal()), 'ResponseMetadata': {'RequestId': 'f5250c73-60f1-4002-8929-3551fbd1702d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'f5250c73-60f1-4002-8929-3551fbd1702d', 'content-type': 'application/x-amz-json-1.1', 'content-length': '2402', 'date': 'Sun, 22 Mar 2020 21:04:23 GMT'}, 'RetryAttempts': 0}}\n"
     ]
    }
   ],
   "source": [
    "running_processor = sagemaker.processing.ProcessingJob.from_processing_name(processing_job_name=scikit_processing_job_name,\n",
    "                                                                            sagemaker_session=sagemaker_session)\n",
    "\n",
    "processing_job_description = running_processor.describe()\n",
    "\n",
    "processing_job_status = processing_job_description['ProcessingJobStatus']\n",
    "print('\\n')\n",
    "print(processing_job_status)\n",
    "print('\\n')\n",
    "\n",
    "print(processing_job_description)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Inspect the Processed Output Data\n",
    "\n",
    "## The next cells will not work properly until the job completes above.\n",
    "\n",
    "Take a look at a few rows of the transformed dataset to make sure the processing was successful."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "s3://sagemaker-us-east-1-835319576252/sagemaker-scikit-learn-2020-03-22-21-04-21-521/output/raw-labeled-split-balanced-header-train\n",
      "s3://sagemaker-us-east-1-835319576252/sagemaker-scikit-learn-2020-03-22-21-04-21-521/output/raw-labeled-split-balanced-header-validation\n",
      "s3://sagemaker-us-east-1-835319576252/sagemaker-scikit-learn-2020-03-22-21-04-21-521/output/raw-labeled-split-balanced-header-test\n"
     ]
    }
   ],
   "source": [
    "output_config = processing_job_description['ProcessingOutputConfig']\n",
    "for output in output_config['Outputs']:\n",
    "    if output['OutputName'] == 'raw-labeled-split-balanced-header-train':\n",
    "        processed_balanced_train_data = output['S3Output']['S3Uri']\n",
    "    if output['OutputName'] == 'raw-labeled-split-balanced-header-validation':\n",
    "        processed_balanced_validation_data = output['S3Output']['S3Uri']        \n",
    "    if output['OutputName'] == 'raw-labeled-split-balanced-header-test':\n",
    "        processed_balanced_test_data = output['S3Output']['S3Uri']\n",
    "        \n",
    "print(processed_balanced_train_data)\n",
    "print(processed_balanced_validation_data)\n",
    "print(processed_balanced_test_data)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 ls $processed_balanced_train_data/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 ls $processed_balanced_validation_data/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 ls $processed_balanced_test_data/"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Pass `scikit_processing_job_s3_output_prefix` above as input to the next notebook"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(scikit_processing_job_s3_output_prefix)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store scikit_processing_job_s3_output_prefix\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
